package pb.wang.streaming.local

import java.util.Date
import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable
import scala.util.Random

/**
  * Created by peibin on 2017/6/13.
  */
object DynamicJobDemo {

  case class Test(val timestamp: Long, val ad: Int, val imp: Int, val clk: Int)


  def main(args: Array[String]): Unit = {
    var i = new AtomicInteger(1);
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    sc.setCheckpointDir("./checkpoint")
    val ssc = new StreamingContext(sc, Seconds(6))
    val lines = mutable.Queue[RDD[Test]]()
    val streams = ssc.queueStream(lines, false)
    val sqlContext = SQLContext.getOrCreate(ssc.sparkContext)
    import sqlContext.implicits._

    streams.foreachRDD(rdd => {
      val t = i.getAndAdd(1)
      println("xxxxxxxxxxxxxxxxxxxxxxx: " + t)
      (0 until t).foreach(x => rdd.toDF.show(1))
    })


    var flag = true
    sys.ShutdownHookThread {
      println("Gracefully stopping Spark Streaming Application at" + new Date())
      flag = false
      ssc.stop(true, true)
      println("Application stopped at" + new Date())
    }

    ssc.start()
    for (x <- 0 to 1000 if flag) yield {
      val ts = new Date().getTime
      lines += sc.makeRDD(Seq(Test(ts, 10, Random.nextInt(10000), Random.nextInt(10)), Test(ts, 20, Random.nextInt(10000), Random.nextInt(20))))
      Thread.sleep(1000)

    }
    ssc.awaitTermination()
  }


}
